Big Data and Analytics Streaming Data এর জন্য DataFrame এবং Dataset API ব্যবহার গাইড ও নোট

298

Apache Spark Streaming হলো একটি শক্তিশালী লাইব্রেরি, যা রিয়েল-টাইম ডেটা স্ট্রিমিং প্রসেসিংয়ের জন্য ব্যবহৃত হয়। Spark SQL DataFrame এবং Dataset API-এর মাধ্যমে Spark Streaming-এ সহজে ডেটা প্রসেস করা সম্ভব হয়। Spark SQL DataFrame এবং Dataset API ব্যবহার করলে স্ট্রিমিং ডেটাকে আরও কার্যকরভাবে বিশ্লেষণ করা যায় এবং জটিল SQL কোয়ারি অথবা ট্রান্সফর্মেশন প্রয়োগ করা যায়।

এখানে, আমরা Spark SQL-এ Streaming Data এর জন্য DataFrame এবং Dataset API ব্যবহারের কিছু মূল ধারণা এবং উদাহরণ দেখব।


1. Spark Streaming DataFrame এবং Dataset API এর জন্য সেটআপ

Spark SQL এ স্ট্রিমিং ডেটা প্রসেস করার জন্য প্রথমে SparkSession তৈরি করতে হয়। SparkSession সেটআপ করার মাধ্যমে আমরা DataFrame এবং Dataset API-এ রিয়েল-টাইম ডেটা প্রসেসিং করতে সক্ষম হব।

SparkSession তৈরি করা:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Streaming DataFrame and Dataset Example") \
    .getOrCreate()

এটি SparkSession তৈরি করবে, যা ডেটা লোড, প্রসেস এবং SQL কোয়ারি এক্সিকিউশন সমর্থন করে।


2. Streaming DataFrame API ব্যবহার

Spark SQL-এ Streaming DataFrame ব্যবহার করার জন্য readStream ফাংশন ব্যবহার করা হয়। এটি স্ট্রিমিং ডেটা সোর্স (যেমন Kafka, Socket, File) থেকে ডেটা রিয়েল-টাইমে লোড করে এবং পরে তা প্রসেস করা যায়। এখানে আমরা Socket সোর্স থেকে ডেটা পড়ব এবং SQL বা DataFrame ট্রান্সফর্মেশন প্রয়োগ করব।

উদাহরণ: Socket থেকে Streaming Data লোড করা

# Socket সোর্স থেকে স্ট্রিমিং ডেটা লোড করা
streaming_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Streaming DataFrame দেখতে
streaming_df.printSchema()

এখানে:

  • readStream ব্যবহার করা হয়েছে, যা Socket থেকে ডেটা পড়বে এবং একটি স্ট্রিমিং DataFrame তৈরি করবে।

DataFrame Transformation এবং Output

# ডেটার উপর ট্রান্সফর্মেশন প্রয়োগ
from pyspark.sql.functions import explode, split

# স্ট্রিমিং ডেটাতে ট্রান্সফর্মেশন (স্পেস দিয়ে শব্দ বিভাজন)
transformed_df = streaming_df.select(
    explode(split(streaming_df["value"], " ")).alias("word")
)

# Output দেখতে
query = transformed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

এখানে:

  • split() এবং explode() ব্যবহার করা হয়েছে, যাতে স্পেস দিয়ে বিভক্ত শব্দগুলোকে আলাদা আলাদা রেকর্ডে রূপান্তরিত করা যায়।
  • writeStream ব্যবহার করে স্ট্রিমিং আউটপুট কনসোলে দেখানো হচ্ছে।

এভাবে, Spark SQL DataFrame API ব্যবহার করে স্ট্রিমিং ডেটা সহজে প্রসেস করা যায়।


3. Dataset API ব্যবহার করে Streaming Data প্রসেসিং

Dataset API DataFrame API-এর উপর ভিত্তি করে তৈরি, তবে এটি টাইপ সেফ। Dataset API ব্যবহার করলে স্ট্রিমিং ডেটাতে আরো শক্তিশালী প্রোগ্রামিং ফিচার পাওয়া যায়। এখানে আমরা Structured Streaming এবং Dataset API ব্যবহার করে Streaming Data প্রসেস করব।

উদাহরণ: Dataset API ব্যবহার করে স্ট্রিমিং ডেটা প্রসেস করা

from pyspark.sql.functions import col

# ডেটাকে Dataset হিসেবে প্রসেস করা
dataset_df = streaming_df.select(col("value").cast("string"))

# স্ট্রিমিং ডেটার উপর Dataset API ব্যবহার
processed_dataset = dataset_df.filter(dataset_df["value"].contains("Spark"))

# স্ট্রিমিং আউটপুট দেখতে
query = processed_dataset.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

এখানে:

  • Dataset API ব্যবহার করে স্ট্রিমিং ডেটা ফিল্টার করা হয়েছে যেখানে "Spark" শব্দটি আছে।
  • writeStream ব্যবহার করে স্ট্রিমিং আউটপুট কনসোলে দেখানো হচ্ছে।

4. Kafka ব্যবহার করে Streaming Data প্রসেস করা

Spark Streaming এ Kafka এর সাথে ইন্টিগ্রেশন খুবই জনপ্রিয়। Kafka থেকে ডেটা নিয়ে তা প্রসেস করা এবং SQL বা DataFrame API প্রয়োগ করা খুবই সহজ।

উদাহরণ: Kafka থেকে Streaming Data লোড করা

# Kafka থেকে স্ট্রিমিং ডেটা লোড করা
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "my_topic") \
    .load()

# Kafka ডেটা সিলেক্ট করা
kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# প্রক্রিয়া করা
processed_kafka_df = kafka_df.filter(kafka_df["value"].contains("Spark"))

# আউটপুট দেখানো
query = processed_kafka_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

এখানে:

  • Kafka থেকে স্ট্রিমিং ডেটা পড়া হচ্ছে এবং তা প্রসেস করা হচ্ছে।
  • selectExpr() ব্যবহার করে Kafka ডেটার key এবং value কে স্ট্রিং হিসেবে কাস্ট করা হয়েছে।

5. Windowing Functions

Spark SQL স্ট্রিমিংয়ের জন্য windowing functions খুবই কার্যকরী। এগুলি ব্যবহার করে একটি নির্দিষ্ট সময়সীমার মধ্যে স্ট্রিমিং ডেটা এগ্রিগেট বা প্রক্রিয়াজাত করা যায়।

উদাহরণ: Time Windowing

from pyspark.sql.functions import window

# Time window ব্যবহার করে স্ট্রিমিং ডেটা এগ্রিগেট করা
windowed_df = streaming_df.groupBy(window(streaming_df["timestamp"], "10 minutes")).count()

# আউটপুট দেখানো
query = windowed_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

এখানে:

  • window() ফাংশন ব্যবহার করে ১০ মিনিটের একটি সময়সীমার মধ্যে ডেটা গ্রুপ করা হয়েছে এবং count() ফাংশন ব্যবহার করে গ্রুপ করা ডেটার পরিমাণ বের করা হয়েছে।

সারাংশ

Spark SQL-এর DataFrame এবং Dataset API ব্যবহার করে Structured Streaming ডেটা প্রসেসিং খুবই সহজ এবং শক্তিশালী। readStream() ব্যবহার করে স্ট্রিমিং ডেটা লোড করা যায়, writeStream() ব্যবহার করে আউটপুট ডেটা প্রদর্শন করা যায়, এবং windowing functions দিয়ে ডেটাকে নির্দিষ্ট সময়সীমার মধ্যে গ্রুপ বা এগ্রিগেট করা যায়। এই API গুলি আপনাকে রিয়েল-টাইম ডেটা প্রসেসিং এবং বিশ্লেষণ করার জন্য প্রয়োজনীয় সমস্ত ফিচার প্রদান করে, যা ব্যবসায়িক সিদ্ধান্ত নেওয়া এবং ডেটা ইন্টিগ্রেশন পিপলাইনের জন্য অপরিহার্য।

Content added By
Promotion

Are you sure to start over?

Loading...